Amazon AthenaのIcebergテーブルのbucket関数で、ハッシュによるパーティション分割を利用してみる
データアナリティクス事業本部 機械学習チームの鈴木です。
Icebergテーブルではパーティション変換関数を使用して既存のカラムからパーティションキーを計算することが可能です。(hidden partitioning)
そのうちbucket
関数を使うと、設定だけでパーティションキーとしてハッシュ値を計算してくれて便利なので、実際に試してみました。
bucket関数を使うと嬉しいこと
カーディナリティの高いカラムをもとにパーティション分割したい場合に便利です。
このような場合、これまではAthenaのHiveのテーブルでは、例えば以下のようにする必要がありました。
- SQLでパーティション用に粒度を粗くしたカラムを新しく作り、それをパーティションキーとする。
- CTASでバケッティングを使う。
- パーティションキーを含むパスでS3にオブジェクトを配置し、injectionを使ってWhere句でオブジェクトキーを指定する。
1つ目のケースは、データ作成のための処理にパーティションキーを作成するための処理を入れ込む必要がありました。難しい処理ではないですが、システム的な処理をSQLに含めることになるので、例えば後から参加したメンバーがみたときになんのための処理なのか分からないかもしれません。
2つ目のケースは、バケッティングがCTASでのみサポートされるため、テーブル作成のたびにCTASをする必要がありました。全量洗い替えかスナップショットを作るような処理となるため、既存のテーブルにデータを追加したい場合には適していませんでした。
3つ目のケースは、高いカーディナリティのカラムをパーティションキーに指定できるものの、パーティション射影の型の仕様で複数のパーティションを跨いでスキャンができませんでした。また、S3はリクエストに料金がかかるため細かい粒度でオブジェクトを作ってしまうと課金が高くなってしまうリスクがありました。
bucket
関数を使うことでこのような心配をしなくても、Iceberg側の仕組みでハッシュ値を計算しパーティション分割のキーに使うことができます。
この記事では、例を通して挙動やメリットを確認できればと思います。
データの準備
2023/8/1から2023/8/9の間に温度を測定するデバイスで1日の平均温度を測定した
という想定で、ダミーのデータを作成しました。
例えば以下のようなものです。device_id
カラムはUUIDで、このIDでデバイスを識別する想定です。
device_id,measurement_date,average_tempelature 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230801,33.15517397540419 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230802,28.80116589256078 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230803,33.47565968592869 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230804,26.019512991684145 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230805,27.026381797648526 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230806,28.756432628583468 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230807,28.445851188379837 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230808,34.29138745895433 54093d70-b58a-4a64-bbdd-7e140ca834f4,20230809,27.409012824331818 bfbb1b80-47fd-4657-a599-230becb40852,20230801,26.223212581867145 bfbb1b80-47fd-4657-a599-230becb40852,20230802,26.658852480915485 bfbb1b80-47fd-4657-a599-230becb40852,20230803,31.92728480493228 bfbb1b80-47fd-4657-a599-230becb40852,20230804,29.085712467877148
このファイルをS3バケットにアップロードした後、以下のようにHive形式でAthenaからGlueテーブルを作成しました。
-- S3バケット名は自身のものに置き換えてください。 CREATE EXTERNAL TABLE `device_data_csv`( device_id string, data string, average_tempelature float) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://S3バケット名/device_data_csv' TBLPROPERTIES ( 'classification'='csv', 'columnsOrdered'='false', 'compressionType'='none', 'delimiter'=',', 'skip.header.line.count'='1')
なお、今回は120個のデバイスIDに関連するデータを生成しました。
やってみた
1. パーティション分割したオブジェクトの作成
まず、以下のようにIcebergの形式でテーブルを作成しました。
-- S3バケット名は自身のものに置き換えてください。 -- 下記ドキュメントのCREATE TABLE ステートメントの例を参考にしました -- https://docs.aws.amazon.com/ja_jp/athena/latest/ug/querying-iceberg-creating-tables.html CREATE TABLE device_data_iceberg ( device_id string, data string, average_tempelature float) PARTITIONED BY (bucket(16,device_id)) LOCATION 's3://S3バケット名/iceberg-device-data' TBLPROPERTIES ( 'table_type'='ICEBERG', 'format'='parquet', 'write_target_data_file_size_bytes'='536870912', 'optimize_rewrite_delete_file_threshold'='10' )
先ほど作ったHiveのテーブルから取得したデータを、このIcebergのテーブルに入れてみて、S3上にどのようにオブジェクトが作成されるか確認しました。
INSERT INTO device_data_iceberg (device_id,"data",average_tempelature) SELECT device_id,"data",average_tempelature FROM device_data_csv
S3バケットを確認すると、確かに以下のように16のフォルダに分けてオブジェクトが作成されていました。これはPARTITIONED BY
でbucket(16,device_id)
と指定したので意図通りですね。
試しに一つオブジェクトを確認してみると以下のようになっていました。
SQLからパーティションの情報を確認することも可能でした。
SELECT * FROM "device_data_iceberg$partitions";
2. 同一パーティション内でのデータスキャン量の確認
バケッティングの場合は一つのパーティション内でさらにスキャン量が削減されましたが、Icebergのbucket
関数の場合はそうはならないと考えられます。念の為確認してみました。
-- S3バケット名は自身のものに置き換えてください。 CREATE TABLE device_data_iceberg_1 ( device_id string, data string, average_tempelature float) PARTITIONED BY (bucket(1,device_id)) LOCATION 's3://S3バケット名/iceberg-device-data_1' TBLPROPERTIES ( 'table_type'='ICEBERG', 'format'='parquet', 'write_target_data_file_size_bytes'='536870912', 'optimize_rewrite_delete_file_threshold'='10' )
以下のようにデータを挿入します。これで一つのパーティションに全てのデータを配置しました。
INSERT INTO device_data_iceberg_1 (device_id,"data",average_tempelature) SELECT device_id,"data",average_tempelature FROM device_data_csv
まずデバイスIDを指定しない場合の検索です。6.63KBのスキャンでした。
次にデバイスIDを指定した場合の検索です。9.33KBのスキャンでした。むしろちょっと増えてしまいましたね。
想定どおり、パーティションに含まれるデータは全てスキャンされるようでした。
3. パーティションへのデータの追加
最後に、既存のパーティションにどのようにデータが追加されるのか確認しました。device_data_iceberg
テーブルに以下のように適当なデータを入れてみました。
INSERT INTO device_data_iceberg VALUES ('64093d70-b58a-4a64-ccdd-7e140ca834f7','20230801',27.758596)
パーティションの情報を確認しました。
SELECT * FROM "cm-nayuts-sample-db"."device_data_iceberg$partitions";
赤枠で囲ったパーティションにデータが追加されたことが分かりました。
最後に
Icebergテーブルのパーティション変換関数であるbucket
関数を使った、ハッシュ値でのパーティション分割の例を紹介しました。
テーブルの設定で簡単にハッシュによるパーティションキーの計算ができるのでとても便利でした。